AWS Data Pipelineリリース!
データパイプライン?
AWS Data Pipelineは、パイプラインと呼ばれるデータ駆動のワークフローを管理するWebサービスです。デベロッパーがデータの処理に集中できるように、依存関係の設定や順序実行やスケジュールの管理等を行います。AWS Data Pipeline APIは、基本的な操作を提供していますので、デベロッパーはこれを活用してアプリケーションを構築することができます。パイプラインの作成、スケジューリング、データソースの定義、依存関係、データ変換等の指示が出来ます。
早速使ってみよう
今はus-east-1のみで使えますので、リージョンを切り替えましょう
新規サービスの申込み完了です。
画面からパイプラインを作成する
画面に沿ってパイプラインを作成してみましょう。
パイプライン名等をナビゲージョンに沿って入力していきます。
そしてお絵描きソフト!?
用意されているテンプレート
Data Pipelineにはテンプレートが複数用意されています。
今後はこのテンプレートが流通しそうですね。続きまして、プログラミングしてみましょう〜
主なAPI
以下に主なAPIをご紹介します。
- ActivatePipeline:パイプラインIDを指定し、作成したパイプラインの検証をしてから初期処理を開始します。規約に沿っていなければエラーとなります。
- CreatePipeline:パイプラインを新規に作成します。ユニークIDと名前と説明を記述します。
- DeletePipeline:パイプラインを削除します。削除されたパイプラインは検索したりリストアすることはできません。
- DescribeObjects:パイプライン内に定義されたオブジェクトの情報を表示します。オプジェクとはキーバリュー型で値を持っています。
- DescribePipelines:パイプライン自身の情報を表示します。
- EvaluateExpression:オブジェクトを操作する独自SQLの評価をします。例えば、ある時刻にデータ変換を開始する等です。
- GetPipelineDefinition:設定されたパイプラインを取得します。
- ListPipelines:有効なパイプライン一覧を表示します。
- PollForTask:タスクランナーによって得られる結果をポーリングして取得します。
- PutPipelineDefinition:タスクやスケジュール等のパイプラインの振る舞いを定義します。
- QueryObjects:問合せによってフィルターされたオブジェクト一覧です。
- ReportTaskProgress:タスクランナーの進捗状況を返します。
- ReportTaskRunnerHeartbeat:タスクランナー自体の状態を返します。
- SetStatus:オブジェクトの状態を更新します。
- SetTaskStatus:タスクの状態を更新します。通常は自動で設定されますが、手動で設定することもできます。
- ValidatePipelineDefinition:パイプライン定義が正しいか検証します。
サンプル作成
とりあえずJavaのSDKが出ていましたので妄想で書いてみたいと思います。
import java.util.ArrayList; import java.util.List;
import com.amazonaws.auth.AWSCredentials; import com.amazonaws.auth.PropertiesCredentials; import com.amazonaws.services.datapipeline.DataPipelineClient; import com.amazonaws.services.datapipeline.model.CreatePipelineRequest; import com.amazonaws.services.datapipeline.model.CreatePipelineResult; import com.amazonaws.services.datapipeline.model.DescribePipelinesRequest; import com.amazonaws.services.datapipeline.model.DescribePipelinesResult; import com.amazonaws.services.datapipeline.model.Field; import com.amazonaws.services.datapipeline.model.PipelineDescription;
public class DataPipeLineSample {
static DataPipelineClient dpClient;
private static void init() throws Exception { AWSCredentials credentials = new PropertiesCredentials( DataPipeLineSample.class.getResourceAsStream("AwsCredentials.properties"));
dpClient = new DataPipelineClient(credentials); }
public static void main(String[] args) throws Exception { init();
String uniqueId = "hogehogehoge-sample"; String name = "classmethod-sample";
CreatePipelineRequest createPipelineRequest = new CreatePipelineRequest(); createPipelineRequest.setUniqueId(uniqueId); createPipelineRequest.setName(name); CreatePipelineResult createPipelineResult = dpClient.createPipeline(createPipelineRequest); String pipelineId = createPipelineResult.getPipelineId(); System.out.println("pipeline ID : " + pipelineId);
List
DescribePipelinesRequest describePipelinesRequest = new DescribePipelinesRequest();
describePipelinesRequest.setPipelineIds(pipelineIds);
DescribePipelinesResult describePipelinesResult = dpClient.describePipelines(describePipelinesRequest);
List
pipeline ID : df-01962283IRRTZGIGVXXX PipelineId : df-01962283IRRTZGIGVSDDD Name : classmethod-sample Description : null Key : @pipelineState RefValue : null StringValue : PENDING Key : name RefValue : null StringValue : classmethod-sample Key : @creationTime RefValue : null StringValue : 2012-12-21T10:16:35 Key : @id RefValue : null StringValue : df-01962283IRRTZGIGVasdf Key : @sphere RefValue : null StringValue : PIPELINE Key : @userId RefValue : null StringValue : 123412814XXX Key : uniqueId RefValue : null StringValue : hogehogehoge-sample Key : @accountId RefValue : null StringValue : 123412814XXX
まとめ
AWS Data Pipelineは要注目のサービスです。様々なオブジェクトを連携させたワークフローに対して、時刻指定でランナーを走らせて結果を得るような流れは、日々変わるビジネスプロセス系アプリケーションの開発にはピッタリですね!次回はもっと深く調べたいと思います。